diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 67d0c8280f0..9c202738dfe 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -890,6 +890,9 @@ class QueryConfig { static constexpr const char* kMarkSortedZeroCopyThreshold = "mark_sorted_zero_copy_threshold"; + /// Number of cuDF TopN batches to accumulate before merging. + static constexpr const char* kCudfTopNBatchSize = "cudf_topn_batch_size"; + enum class RowSizeTrackingMode { DISABLED = 0, EXCLUDE_DELTA_SPLITS = 1, diff --git a/velox/experimental/cudf/CudfConfig.h b/velox/experimental/cudf/CudfConfig.h deleted file mode 100644 index 9cd7e21b62c..00000000000 --- a/velox/experimental/cudf/CudfConfig.h +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include - -namespace facebook::velox::cudf_velox { - -struct CudfConfig { - /// Keys used by the initialize() method. - static constexpr const char* kCudfEnabled{"cudf.enabled"}; - static constexpr const char* kCudfDebugEnabled{"cudf.debug_enabled"}; - static constexpr const char* kCudfMemoryResource{"cudf.memory_resource"}; - static constexpr const char* kCudfMemoryPercent{"cudf.memory_percent"}; - static constexpr const char* kCudfFunctionNamePrefix{ - "cudf.function_name_prefix"}; - static constexpr const char* kCudfAstExpressionEnabled{ - "cudf.ast_expression_enabled"}; - static constexpr const char* kCudfAstExpressionPriority{ - "cudf.ast_expression_priority"}; - static constexpr const char* kCudfJitExpressionEnabled{ - "cudf.jit_expression_enabled"}; - static constexpr const char* kCudfJitExpressionPriority{ - "cudf.jit_expression_priority"}; - static constexpr const char* kCudfOutputMr{"cudf.output_mr"}; - static constexpr const char* kCudfAllowCpuFallback{"cudf.allow_cpu_fallback"}; - static constexpr const char* kCudfLogFallback{"cudf.log_fallback"}; - static constexpr const char* kCudfBatchSizeMinThreshold{ - "cudf.batch_size_min_threshold"}; - static constexpr const char* kCudfBatchSizeMaxThreshold{ - "cudf.batch_size_max_threshold"}; - static constexpr const char* kCudfConcatOptimizationEnabled{ - "cudf.concat_optimization_enabled"}; - // The value could be either spark or presto. - static constexpr const char* kCudfFunctionEngine{"cudf.function_engine"}; - - /// Query session configs for the cuDF Operators. - static constexpr const char* kCudfTopNBatchSize{"cudf.topk_batch_size"}; - - /// Singleton CudfConfig instance. - /// Clients must set the configs below before invoking registerCudf(). - static CudfConfig& getInstance(); - - /// Initialize from a map with the above keys. - void initialize(std::unordered_map&&); - - /// Enable cudf by default. - /// Clients can disable here and enable it via the QueryConfig as well. - bool enabled{true}; - - /// Enable debug printing. - bool debugEnabled{false}; - - /// Allow fallback to CPU operators if GPU operator replacement fails. - bool allowCpuFallback{true}; - - /// Memory resource for cuDF. - /// Possible values are (cuda, pool, async, arena, managed, managed_pool). - std::string memoryResource{"async"}; - - /// The initial percent of GPU memory to allocate for pool or arena memory - /// resources. - int32_t memoryPercent{50}; - - /// Memory resource for output vectors. When set to a value different from - /// memoryResource, a separate MR is created for output allocations. - /// When empty, the main memoryResource is used. - std::string outputMemoryResource; - - /// Register all the functions with the functionNamePrefix. - std::string functionNamePrefix; - - /// Enable AST in expression evaluation. - bool astExpressionEnabled{true}; - - /// Enable JIT in expression evaluation. - bool jitExpressionEnabled{true}; - - /// Priority of AST expression. Expression with higher priority is chosen for - /// a given root expression. - /// Example: - /// Priority of expression that uses individual cuDF functions is 50. - /// If AST priority is 100 then for a velox expression node that is supported - /// by both, AST will be chosen as replacement for cudf execution, if AST - /// priority is 25 then standalone cudf function is chosen. - int astExpressionPriority{100}; - - /// Priority of JIT expression. - int jitExpressionPriority{101}; - - /// Whether to log a reason for falling back to Velox CPU execution. - bool logFallback{true}; - - /// Whether to insert CudfBatchConcat operators before supported Cudf - /// operators. - /// This can improve performance by reducing the number of cuda kernel - /// launches on addInput of certain operators by collecting a minimum number - /// of rows before concatenating and passing on to the next operator. - /// This batch size is determined by batchSizeMinThreshold and - /// batchSizeMaxThreshold - bool concatOptimizationEnabled{false}; - - /// Minimum rows to accumulate before GPU-side concatenation in - /// `CudfBatchConcat` (default 100k). - int32_t batchSizeMinThreshold{100000}; - - /// Maximum rows allowed in a concatenated batch (user configurable). - /// When not set, cuDF's own `size_type::max()` is used. - std::optional batchSizeMaxThreshold; - // Query config key for the TopN batch size in the cuDF TopN operator. - int32_t topNBatchSize{5}; - - // Register the Spark or Presto functions, the value could be either spark or - // presto. - std::string functionEngine{"presto"}; -}; - -} // namespace facebook::velox::cudf_velox diff --git a/velox/experimental/cudf/benchmarks/CudfTpchBenchmark.cpp b/velox/experimental/cudf/benchmarks/CudfTpchBenchmark.cpp index a1fe03deeb7..66956f33a16 100644 --- a/velox/experimental/cudf/benchmarks/CudfTpchBenchmark.cpp +++ b/velox/experimental/cudf/benchmarks/CudfTpchBenchmark.cpp @@ -14,8 +14,8 @@ * limitations under the License. */ -#include "velox/experimental/cudf/CudfConfig.h" #include "velox/experimental/cudf/benchmarks/CudfTpchBenchmark.h" +#include "velox/experimental/cudf/common/CudfSystemConfig.h" #include "velox/experimental/cudf/connectors/hive/CudfHiveConfig.h" #include "velox/experimental/cudf/connectors/hive/CudfHiveTableHandle.h" #include "velox/experimental/cudf/exec/CudfConversion.h" @@ -97,12 +97,13 @@ void CudfTpchBenchmark::initialize() { connector::registerConnector(cudfHiveConnector); } - cudf_velox::CudfConfig::getInstance().memoryResource = - FLAGS_cudf_memory_resource; - cudf_velox::CudfConfig::getInstance().memoryPercent = - FLAGS_cudf_memory_percent; - - cudf_velox::CudfConfig::getInstance().debugEnabled = FLAGS_cudf_debug_enabled; + cudf_velox::CudfSystemConfig::getInstance().updateConfigs( + {{cudf_velox::CudfSystemConfig::kCudfMemoryResource, + FLAGS_cudf_memory_resource}, + {cudf_velox::CudfSystemConfig::kCudfMemoryPercent, + folly::to(FLAGS_cudf_memory_percent)}, + {cudf_velox::CudfSystemConfig::kCudfDebugEnabled, + FLAGS_cudf_debug_enabled ? "true" : "false"}}); // Enable cuDF operators cudf_velox::registerCudf(); diff --git a/velox/experimental/cudf/common/CMakeLists.txt b/velox/experimental/cudf/common/CMakeLists.txt new file mode 100644 index 00000000000..48783157329 --- /dev/null +++ b/velox/experimental/cudf/common/CMakeLists.txt @@ -0,0 +1,17 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_library(velox_cudf_config CudfSystemConfig.cpp) + +target_link_libraries(velox_cudf_config PRIVATE velox_common_base) diff --git a/velox/experimental/cudf/common/CudfSystemConfig.cpp b/velox/experimental/cudf/common/CudfSystemConfig.cpp new file mode 100644 index 00000000000..e2ae8492fa1 --- /dev/null +++ b/velox/experimental/cudf/common/CudfSystemConfig.cpp @@ -0,0 +1,58 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/experimental/cudf/common/CudfSystemConfig.h" + +#include + +namespace facebook::velox::cudf_velox { + +CudfSystemConfig::CudfSystemConfig() + : velox::config::ConfigBase( + std::unordered_map{}, + true) {} + +CudfSystemConfig::CudfSystemConfig( + std::unordered_map&& values) + : velox::config::ConfigBase(std::move(values), true) { + validateConfigs(); +} + +CudfSystemConfig& CudfSystemConfig::getInstance() { + static CudfSystemConfig instance; + return instance; +} + +void CudfSystemConfig::validateConfigs() { + if (auto cudfMemoryPercent = get(kCudfMemoryPercent)) { + VELOX_USER_CHECK_GT( + cudfMemoryPercent, + 0, + "cudf.memory_percent must be greater than 0 to initialize cuDF memory resource"); + } +} + +void CudfSystemConfig::updateConfigs( + std::unordered_map&& config) { + for (auto& [key, value] : config) { + // TODO(ps): Revert support for legacy config names. + auto canonicalKey = key; + std::replace(canonicalKey.begin(), canonicalKey.end(), '_', '-'); + set(canonicalKey, value); + } +} + +} // namespace facebook::velox::cudf_velox diff --git a/velox/experimental/cudf/common/CudfSystemConfig.h b/velox/experimental/cudf/common/CudfSystemConfig.h new file mode 100644 index 00000000000..38138a99dad --- /dev/null +++ b/velox/experimental/cudf/common/CudfSystemConfig.h @@ -0,0 +1,188 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/common/config/Config.h" + +#include +#include +#include + +namespace facebook::velox::cudf_velox { + +class CudfSystemConfig : public velox::config::ConfigBase { + public: + /// Default constructor. + CudfSystemConfig(); + + /// Constructor that initializes config from a map of values. + explicit CudfSystemConfig( + std::unordered_map&& values); + + /// Enable cudf by default. + /// Clients must set the configs below before invoking registerCudf(). + static constexpr const char* kCudfEnabled = "cudf.enabled"; + + /// Enable debug logging. + static constexpr const char* kCudfDebugEnabled = "cudf.debug-enabled"; + + /// Whether to log a reason for fallback to Velox CPU execution. + static constexpr const char* kCudfLogFallback = "cudf.log-fallback"; + + /// Memory resource for cuDF. + /// Possible values are (cuda, pool, async, arena, managed, managed_pool). + static constexpr const char* kCudfMemoryResource = "cudf.memory-resource"; + + /// The initial percent of GPU memory to allocate for pool or arena memory + /// resources. + static constexpr const char* kCudfMemoryPercent = "cudf.memory-percent"; + + /// Memory resource for output vectors. + static constexpr const char* kCudfOutputMemoryResource = "cudf.output-mr"; + + /// Register all the functions with the functionNamePrefix. + static constexpr const char* kCudfFunctionNamePrefix = + "cudf.function-name-prefix"; + + /// Function engine used for registration and signature selection. + static constexpr const char* kCudfFunctionEngine = "cudf.function-engine"; + + /// Enable AST in expression evaluation. + static constexpr const char* kCudfAstExpressionEnabled = + "cudf.ast-expression-enabled"; + + /// Priority of AST expression. Expression with higher priority is chosen for + /// a given root expression. + /// Example: + /// Priority of expression that uses individual cuDF functions is 50. + /// If AST priority is 100 then for a velox expression node that is supported + /// by both, AST will be chosen as replacement for cudf execution, if AST + /// priority is 25 then standalone cudf function is chosen. + static constexpr const char* kCudfAstExpressionPriority = + "cudf.ast-expression-priority"; + + /// Enable JIT in expression evaluation. + static constexpr const char* kCudfJitExpressionEnabled = + "cudf.jit-expression-enabled"; + + /// Priority of JIT expression. + static constexpr const char* kCudfJitExpressionPriority = + "cudf.jit-expression-priority"; + + /// Allow fallback to CPU operators if GPU operator replacement fails. + static constexpr const char* kCudfAllowCpuFallback = + "cudf.allow-cpu-fallback"; + + /// Whether to insert CudfBatchConcat operators before supported operators. + static constexpr const char* kCudfConcatOptimizationEnabled = + "cudf.concat-optimization-enabled"; + + /// Minimum rows to accumulate before GPU-side concatenation. + static constexpr const char* kCudfBatchSizeMinThreshold = + "cudf.batch-size-min-threshold"; + + /// Maximum rows allowed in a concatenated batch. + static constexpr const char* kCudfBatchSizeMaxThreshold = + "cudf.batch-size-max-threshold"; + + /// Number of TopN batches to accumulate before merging. + static constexpr const char* kCudfTopNBatchSize = "cudf.topn-batch-size"; + + /// Singleton CudfSystemConfig instance. + static CudfSystemConfig& getInstance(); + + /// Update config from a map. Supports modern dash-delimited keys and legacy + /// underscore-delimited keys. + /// + /// Example: "cudf.allow_cpu_fallback" and "cudf.allow-cpu-fallback" are + /// normalized to canonical internal key "cudf.allow-cpu-fallback". + void updateConfigs(std::unordered_map&&); + + /// Individual getter methods for each configuration. + bool cudfEnabled() const { + return get(kCudfEnabled, true); + } + + bool debugEnabled() const { + return get(kCudfDebugEnabled, false); + } + + bool logFallback() const { + return get(kCudfLogFallback, false); + } + + std::string memoryResource() const { + return get(kCudfMemoryResource, "async"); + } + + int32_t memoryPercent() const { + return get(kCudfMemoryPercent, 50); + } + + std::string outputMemoryResource() const { + return get(kCudfOutputMemoryResource, ""); + } + + std::string functionNamePrefix() const { + return get(kCudfFunctionNamePrefix, ""); + } + + std::string functionEngine() const { + return get(kCudfFunctionEngine, "presto"); + } + + bool astExpressionEnabled() const { + return get(kCudfAstExpressionEnabled, true); + } + + int32_t astExpressionPriority() const { + return get(kCudfAstExpressionPriority, 100); + } + + bool jitExpressionEnabled() const { + return get(kCudfJitExpressionEnabled, true); + } + + int32_t jitExpressionPriority() const { + return get(kCudfJitExpressionPriority, 50); + } + + bool allowCpuFallback() const { + return get(kCudfAllowCpuFallback, true); + } + + bool concatOptimizationEnabled() const { + return get(kCudfConcatOptimizationEnabled, false); + } + + int32_t batchSizeMinThreshold() const { + return get(kCudfBatchSizeMinThreshold, 100000); + } + + std::optional batchSizeMaxThreshold() const { + return get(kCudfBatchSizeMaxThreshold); + } + + int32_t topNBatchSize() const { + return get(kCudfTopNBatchSize, 5); + } + + private: + void validateConfigs(); +}; + +} // namespace facebook::velox::cudf_velox diff --git a/velox/experimental/cudf/exec/CMakeLists.txt b/velox/experimental/cudf/exec/CMakeLists.txt index fdf57f06629..0f38ce60ba6 100644 --- a/velox/experimental/cudf/exec/CMakeLists.txt +++ b/velox/experimental/cudf/exec/CMakeLists.txt @@ -40,6 +40,7 @@ target_link_libraries( velox_arrow_bridge velox_exception velox_common_base + velox_cudf_config velox_cudf_vector velox_exec velox_cudf_hive_connector diff --git a/velox/experimental/cudf/exec/CudfBatchConcat.cpp b/velox/experimental/cudf/exec/CudfBatchConcat.cpp index 9919de1b324..94f74c58d64 100644 --- a/velox/experimental/cudf/exec/CudfBatchConcat.cpp +++ b/velox/experimental/cudf/exec/CudfBatchConcat.cpp @@ -14,8 +14,8 @@ * limitations under the License. */ -#include "velox/experimental/cudf/CudfConfig.h" #include "velox/experimental/cudf/CudfNoDefaults.h" +#include "velox/experimental/cudf/common/CudfSystemConfig.h" #include "velox/experimental/cudf/exec/CudfBatchConcat.h" #include "velox/experimental/cudf/exec/GpuResources.h" #include "velox/experimental/cudf/exec/Utilities.h" @@ -37,7 +37,7 @@ CudfBatchConcat::CudfBatchConcat( planNode->id(), nvtx3::rgb{211, 211, 211} /* LightGrey */), driverCtx_(driverCtx), - targetRows_(CudfConfig::getInstance().batchSizeMinThreshold) {} + targetRows_(CudfSystemConfig::getInstance().batchSizeMinThreshold()) {} void CudfBatchConcat::addInput(RowVectorPtr input) { auto cudfVector = std::dynamic_pointer_cast(input); diff --git a/velox/experimental/cudf/exec/CudfFilterProject.cpp b/velox/experimental/cudf/exec/CudfFilterProject.cpp index 3e8b7597982..8951443d69f 100644 --- a/velox/experimental/cudf/exec/CudfFilterProject.cpp +++ b/velox/experimental/cudf/exec/CudfFilterProject.cpp @@ -14,8 +14,8 @@ * limitations under the License. */ -#include "velox/experimental/cudf/CudfConfig.h" #include "velox/experimental/cudf/CudfNoDefaults.h" +#include "velox/experimental/cudf/common/CudfSystemConfig.h" #include "velox/experimental/cudf/exec/CudfFilterProject.h" #include "velox/experimental/cudf/exec/GpuResources.h" #include "velox/experimental/cudf/exec/VeloxCudfInterop.h" @@ -196,7 +196,7 @@ void CudfFilterProject::initialize() { : filter_->sources()[0]->outputType(); // convert to AST - if (CudfConfig::getInstance().debugEnabled) { + if (CudfSystemConfig::getInstance().debugEnabled()) { int i = 0; for (const auto& expr : expr->exprs()) { LOG(INFO) << "expr[" << i++ << "] " << expr->toString(); @@ -256,7 +256,7 @@ RowVectorPtr CudfFilterProject::getOutput() { stream.synchronize(); auto const numColumns = outputTable->num_columns(); auto const size = outputTable->num_rows(); - if (CudfConfig::getInstance().debugEnabled) { + if (CudfSystemConfig::getInstance().debugEnabled()) { VLOG(1) << "cudfProject Output: " << size << " rows, " << numColumns << " columns"; } diff --git a/velox/experimental/cudf/exec/CudfHashAggregation.cpp b/velox/experimental/cudf/exec/CudfHashAggregation.cpp index 57e49070f8d..30d7b6b5716 100644 --- a/velox/experimental/cudf/exec/CudfHashAggregation.cpp +++ b/velox/experimental/cudf/exec/CudfHashAggregation.cpp @@ -14,8 +14,8 @@ * limitations under the License. */ -#include "velox/experimental/cudf/CudfConfig.h" #include "velox/experimental/cudf/CudfNoDefaults.h" +#include "velox/experimental/cudf/common/CudfSystemConfig.h" #include "velox/experimental/cudf/exec/CudfFilterProject.h" #include "velox/experimental/cudf/exec/CudfHashAggregation.h" #include "velox/experimental/cudf/exec/GpuResources.h" @@ -681,8 +681,10 @@ std::unique_ptr createAggregator( uint32_t inputIndex, VectorPtr constant, bool isGlobal, - const TypePtr& resultType) { - auto prefix = cudf_velox::CudfConfig::getInstance().functionNamePrefix; + const TypePtr& resultType, + const cudf_velox::CudfSystemConfig& cudfConfig) { + const auto prefix = + cudf_velox::CudfSystemConfig::getInstance().functionNamePrefix(); if (kind.rfind(prefix + "sum", 0) == 0) { return std::make_unique( step, inputIndex, constant, isGlobal, resultType); @@ -1875,7 +1877,7 @@ bool registerStepAwareBuiltinAggregationFunctions(const std::string& prefix) { // AVG partial REAL->row(DOUBLE,BIGINT) and intermediate are the same for // both engines and are already registered above. - if (CudfConfig::getInstance().functionEngine == "spark") { + if (CudfSystemConfig::getInstance().functionEngine() == "spark") { // Spark: SUM(REAL) -> DOUBLE, AVG(REAL) -> DOUBLE appendRegisterAggregationFunctionForStep( prefix + "sum", diff --git a/velox/experimental/cudf/exec/CudfHashJoin.cpp b/velox/experimental/cudf/exec/CudfHashJoin.cpp index aebc4debe16..7fad366b99a 100644 --- a/velox/experimental/cudf/exec/CudfHashJoin.cpp +++ b/velox/experimental/cudf/exec/CudfHashJoin.cpp @@ -14,8 +14,8 @@ * limitations under the License. */ -#include "velox/experimental/cudf/CudfConfig.h" #include "velox/experimental/cudf/CudfNoDefaults.h" +#include "velox/experimental/cudf/common/CudfSystemConfig.h" #include "velox/experimental/cudf/exec/CudfHashJoin.h" #include "velox/experimental/cudf/exec/GpuResources.h" #include "velox/experimental/cudf/exec/Utilities.h" @@ -87,7 +87,7 @@ void CudfHashJoinProbe::close() { void CudfHashJoinBridge::setHashTable( std::optional hashObject) { - if (CudfConfig::getInstance().debugEnabled) { + if (CudfSystemConfig::getInstance().debugEnabled()) { VLOG(2) << "Calling CudfHashJoinBridge::setHashTable"; } std::vector promises; @@ -104,22 +104,22 @@ void CudfHashJoinBridge::setHashTable( std::optional CudfHashJoinBridge::hashOrFuture( ContinueFuture* future) { - if (CudfConfig::getInstance().debugEnabled) { + if (CudfSystemConfig::getInstance().debugEnabled()) { VLOG(2) << "Calling CudfHashJoinBridge::hashOrFuture"; } std::lock_guard l(mutex_); if (hashObject_.has_value()) { return hashObject_; } - if (CudfConfig::getInstance().debugEnabled) { + if (CudfSystemConfig::getInstance().debugEnabled()) { VLOG(2) << "Calling CudfHashJoinBridge::hashOrFuture constructing promise"; } promises_.emplace_back("CudfHashJoinBridge::hashOrFuture"); - if (CudfConfig::getInstance().debugEnabled) { + if (CudfSystemConfig::getInstance().debugEnabled()) { VLOG(2) << "Calling CudfHashJoinBridge::hashOrFuture getSemiFuture"; } *future = promises_.back().getSemiFuture(); - if (CudfConfig::getInstance().debugEnabled) { + if (CudfSystemConfig::getInstance().debugEnabled()) { VLOG(2) << "Calling CudfHashJoinBridge::hashOrFuture returning nullopt"; } return std::nullopt; @@ -151,13 +151,13 @@ CudfHashJoinBuild::CudfHashJoinBuild( operatorId, fmt::format("[{}]", joinNode->id())), joinNode_(joinNode) { - if (CudfConfig::getInstance().debugEnabled) { + if (CudfSystemConfig::getInstance().debugEnabled()) { VLOG(2) << "CudfHashJoinBuild constructor"; } } void CudfHashJoinBuild::addInput(RowVectorPtr input) { - if (CudfConfig::getInstance().debugEnabled) { + if (CudfSystemConfig::getInstance().debugEnabled()) { VLOG(2) << "Calling CudfHashJoinBuild::addInput"; } // Queue inputs, process all at once. @@ -177,7 +177,7 @@ void CudfHashJoinBuild::addInput(RowVectorPtr input) { } bool CudfHashJoinBuild::needsInput() const { - if (CudfConfig::getInstance().debugEnabled) { + if (CudfSystemConfig::getInstance().debugEnabled()) { VLOG(2) << "Calling CudfHashJoinBuild::needsInput"; } return !noMoreInput_; @@ -188,7 +188,8 @@ RowVectorPtr CudfHashJoinBuild::getOutput() { } void CudfHashJoinBuild::noMoreInput() { - if (CudfConfig::getInstance().debugEnabled) { + bool debugEnabled = CudfSystemConfig::getInstance().debugEnabled(); + if (debugEnabled) { VLOG(2) << "Calling CudfHashJoinBuild::noMoreInput"; } VELOX_NVTX_OPERATOR_FUNC_RANGE(); @@ -217,7 +218,7 @@ void CudfHashJoinBuild::noMoreInput() { } }; - if (CudfConfig::getInstance().debugEnabled) { + if (debugEnabled) { VLOG(1) << "CudfHashJoinBuild: build batches"; VLOG(1) << "Build batches number of columns: " << inputs_[0]->getTableView().num_columns(); @@ -239,7 +240,7 @@ void CudfHashJoinBuild::noMoreInput() { for (auto const& tbl : tbls) { VELOX_CHECK_NOT_NULL(tbl); } - if (CudfConfig::getInstance().debugEnabled) { + if (debugEnabled) { VLOG(1) << "Build table number of columns: " << tbls[0]->num_columns(); for (auto i = 0; i < tbls.size(); i++) { VLOG(1) << "Build table " << i @@ -274,7 +275,7 @@ void CudfHashJoinBuild::noMoreInput() { if (buildHashJoin) { VELOX_CHECK_NOT_NULL(hashObjects.back()); } - if (CudfConfig::getInstance().debugEnabled) { + if (debugEnabled) { if (hashObjects.back() != nullptr) { VLOG(2) << "hashObject " << i << " is not nullptr " << hashObjects.back().get() << "\n"; @@ -330,13 +331,14 @@ CudfHashJoinProbe::CudfHashJoinProbe( probeType_(joinNode_->sources()[0]->outputType()), buildType_(joinNode_->sources()[1]->outputType()), cudaEvent_(std::make_unique(cudaEventDisableTiming)) { - if (CudfConfig::getInstance().debugEnabled) { + bool debugEnabled = CudfSystemConfig::getInstance().debugEnabled(); + if (debugEnabled) { VLOG(2) << "CudfHashJoinProbe constructor"; } auto const& leftKeys = joinNode_->leftKeys(); // probe keys auto const& rightKeys = joinNode_->rightKeys(); // build keys - if (CudfConfig::getInstance().debugEnabled) { + if (debugEnabled) { for (int i = 0; i < probeType_->names().size(); i++) { VLOG(1) << "Left column " << i << ": " << probeType_->names()[i]; } @@ -378,7 +380,7 @@ CudfHashJoinProbe::CudfHashJoinProbe( rightColumnOutputIndices_ = std::vector(); for (int i = 0; i < outputType->names().size(); i++) { auto const outputName = outputType->names()[i]; - if (CudfConfig::getInstance().debugEnabled) { + if (debugEnabled) { VLOG(1) << "Output column " << i << ": " << outputName; } auto channel = probeType_->getChildIdxIfExists(outputName); @@ -399,7 +401,7 @@ CudfHashJoinProbe::CudfHashJoinProbe( "Join field {} not in probe or build input", outputType->children()[i]); } - if (CudfConfig::getInstance().debugEnabled) { + if (debugEnabled) { for (int i = 0; i < leftColumnIndicesToGather_.size(); i++) { VLOG(1) << "Left index to gather " << i << ": " << leftColumnIndicesToGather_[i]; @@ -455,7 +457,7 @@ CudfHashJoinProbe::CudfHashJoinProbe( } bool CudfHashJoinProbe::needsInput() const { - if (CudfConfig::getInstance().debugEnabled) { + if (CudfSystemConfig::getInstance().debugEnabled()) { VLOG(2) << "Calling CudfHashJoinProbe::needsInput"; } if (joinNode_->isRightSemiFilterJoin()) { @@ -493,7 +495,8 @@ void CudfHashJoinProbe::addInput(RowVectorPtr input) { } void CudfHashJoinProbe::noMoreInput() { - if (CudfConfig::getInstance().debugEnabled) { + bool debugEnabled = CudfSystemConfig::getInstance().debugEnabled(); + if (debugEnabled) { VLOG(2) << "Calling CudfHashJoinProbe::noMoreInput"; } VELOX_NVTX_OPERATOR_FUNC_RANGE(); @@ -602,7 +605,7 @@ void CudfHashJoinProbe::noMoreInput() { VELOX_CHECK_NOT_NULL(tbl); - if (CudfConfig::getInstance().debugEnabled) { + if (debugEnabled) { VLOG(1) << "Probe table number of columns: " << tbl->num_columns(); VLOG(1) << "Probe table number of rows: " << tbl->num_rows(); } @@ -632,7 +635,7 @@ std::unique_ptr CudfHashJoinProbe::unfilteredOutput( auto rightResult = cudf::gather( rightInput, rightIndicesCol, oobPolicy, stream, get_output_mr()); - if (CudfConfig::getInstance().debugEnabled) { + if (CudfSystemConfig::getInstance().debugEnabled()) { VLOG(1) << "Left result number of columns: " << leftResult->num_columns(); VLOG(1) << "Right result number of columns: " << rightResult->num_columns(); } @@ -1355,7 +1358,8 @@ std::vector> CudfHashJoinProbe::antiJoin( } RowVectorPtr CudfHashJoinProbe::getOutput() { - if (CudfConfig::getInstance().debugEnabled) { + bool debugEnabled = CudfSystemConfig::getInstance().debugEnabled(); + if (debugEnabled) { VLOG(2) << "Calling CudfHashJoinProbe::getOutput"; } VELOX_NVTX_OPERATOR_FUNC_RANGE(); @@ -1449,7 +1453,7 @@ RowVectorPtr CudfHashJoinProbe::getOutput() { // Use getTableView() to avoid expensive materialization for packed_table. // cudfInput is staying alive until the table view is no longer needed. auto leftTableView = cudfInput->getTableView(); - if (CudfConfig::getInstance().debugEnabled) { + if (debugEnabled) { VLOG(1) << "Probe table number of columns: " << leftTableView.num_columns(); VLOG(1) << "Probe table number of rows: " << leftTableView.num_rows(); } @@ -1460,7 +1464,7 @@ RowVectorPtr CudfHashJoinProbe::getOutput() { auto& rightTable = rightTables[i]; auto& hb = hbs[i]; VELOX_CHECK_NOT_NULL(rightTable); - if (CudfConfig::getInstance().debugEnabled) { + if (debugEnabled) { if (rightTable != nullptr) VLOG(2) << "right_table is not nullptr " << rightTable.get() << " hasValue(" << hashObject_.has_value() << ")\n"; @@ -1556,7 +1560,7 @@ exec::BlockingReason CudfHashJoinProbe::isBlocked(ContinueFuture* future) { auto hashObject = cudfJoinBridge->hashOrFuture(future); if (!hashObject.has_value()) { - if (CudfConfig::getInstance().debugEnabled) { + if (CudfSystemConfig::getInstance().debugEnabled()) { VLOG(2) << "CudfHashJoinProbe is blocked, waiting for join build"; } return exec::BlockingReason::kWaitForJoinBuild; @@ -1644,7 +1648,7 @@ std::unique_ptr CudfHashJoinBridgeTranslator::toOperator( exec::DriverCtx* ctx, int32_t id, const core::PlanNodePtr& node) { - if (CudfConfig::getInstance().debugEnabled) { + if (CudfSystemConfig::getInstance().debugEnabled()) { VLOG(2) << "Calling CudfHashJoinBridgeTranslator::toOperator"; } if (auto joinNode = @@ -1656,7 +1660,7 @@ std::unique_ptr CudfHashJoinBridgeTranslator::toOperator( std::unique_ptr CudfHashJoinBridgeTranslator::toJoinBridge( const core::PlanNodePtr& node) { - if (CudfConfig::getInstance().debugEnabled) { + if (CudfSystemConfig::getInstance().debugEnabled()) { VLOG(2) << "Calling CudfHashJoinBridgeTranslator::toJoinBridge"; } if (auto joinNode = @@ -1669,7 +1673,7 @@ std::unique_ptr CudfHashJoinBridgeTranslator::toJoinBridge( exec::OperatorSupplier CudfHashJoinBridgeTranslator::toOperatorSupplier( const core::PlanNodePtr& node) { - if (CudfConfig::getInstance().debugEnabled) { + if (CudfSystemConfig::getInstance().debugEnabled()) { VLOG(2) << "Calling CudfHashJoinBridgeTranslator::toOperatorSupplier"; } if (auto joinNode = diff --git a/velox/experimental/cudf/exec/CudfTopN.cpp b/velox/experimental/cudf/exec/CudfTopN.cpp index 8e259dfb652..5daf164a412 100644 --- a/velox/experimental/cudf/exec/CudfTopN.cpp +++ b/velox/experimental/cudf/exec/CudfTopN.cpp @@ -13,8 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include "velox/experimental/cudf/CudfConfig.h" #include "velox/experimental/cudf/CudfNoDefaults.h" +#include "velox/experimental/cudf/common/CudfSystemConfig.h" #include "velox/experimental/cudf/exec/CudfTopN.h" #include "velox/experimental/cudf/exec/GpuResources.h" #include "velox/experimental/cudf/exec/Utilities.h" @@ -28,6 +28,16 @@ #include namespace facebook::velox::cudf_velox { +namespace { +inline int32_t topNBatchSize(const core::QueryCtx& ctx) { + std::optional topNBatchSizeSession = + ctx.queryConfig().get(core::QueryConfig::kCudfTopNBatchSize); + return topNBatchSizeSession.has_value() + ? topNBatchSizeSession.value() + : CudfSystemConfig::getInstance().topNBatchSize(); +} +} // namespace + CudfTopN::CudfTopN( int32_t operatorId, exec::DriverCtx* driverCtx, @@ -44,7 +54,7 @@ CudfTopN::CudfTopN( fmt::format("[{}]", topNNode->id())), count_(topNNode->count()), topNNode_(topNNode), - kBatchSize_(CudfConfig::getInstance().topNBatchSize), + kBatchSize_(topNBatchSize(operatorCtx_->execCtx()->queryCtx())), cudaEvent_(std::make_unique(cudaEventDisableTiming)) { const auto numColumns{outputType_->children().size()}; const auto numSortingKeys{topNNode->sortingKeys().size()}; diff --git a/velox/experimental/cudf/exec/CudfTopN.h b/velox/experimental/cudf/exec/CudfTopN.h index a08dff7b780..b11ddce6f26 100644 --- a/velox/experimental/cudf/exec/CudfTopN.h +++ b/velox/experimental/cudf/exec/CudfTopN.h @@ -15,7 +15,6 @@ */ #pragma once -#include "velox/experimental/cudf/CudfConfig.h" #include "velox/experimental/cudf/exec/NvtxHelper.h" #include "velox/experimental/cudf/vector/CudfVector.h" diff --git a/velox/experimental/cudf/exec/OperatorAdapters.cpp b/velox/experimental/cudf/exec/OperatorAdapters.cpp index 1cb04e8f381..c854b4af64a 100644 --- a/velox/experimental/cudf/exec/OperatorAdapters.cpp +++ b/velox/experimental/cudf/exec/OperatorAdapters.cpp @@ -14,6 +14,7 @@ * limitations under the License. */ +#include "velox/experimental/cudf/common/CudfSystemConfig.h" #include "velox/experimental/cudf/connectors/hive/CudfHiveConnector.h" #include "velox/experimental/cudf/exec/CudfAssignUniqueId.h" #include "velox/experimental/cudf/exec/CudfBatchConcat.h" @@ -248,7 +249,7 @@ class AggregationAdapter : public OperatorAdapter { std::dynamic_pointer_cast(planNode); std::vector> result; - if (CudfConfig::getInstance().concatOptimizationEnabled) { + if (CudfSystemConfig::getInstance().concatOptimizationEnabled()) { result.push_back( std::make_unique( operatorId, ctx, aggregationPlanNode)); diff --git a/velox/experimental/cudf/exec/ToCudf.cpp b/velox/experimental/cudf/exec/ToCudf.cpp index 1f3a8ef8235..7c8bf5c4ede 100644 --- a/velox/experimental/cudf/exec/ToCudf.cpp +++ b/velox/experimental/cudf/exec/ToCudf.cpp @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "velox/experimental/cudf/CudfConfig.h" +#include "velox/experimental/cudf/common/CudfSystemConfig.h" #include "velox/experimental/cudf/exec/CudfConversion.h" #include "velox/experimental/cudf/exec/CudfHashAggregation.h" #include "velox/experimental/cudf/exec/CudfHashJoin.h" @@ -68,7 +68,7 @@ bool CompileState::compile(bool allowCpuFallback) { auto operators = driver_.operators(); // Cache debug flag to avoid repeated getInstance() calls - const bool debugEnabled = CudfConfig::getInstance().debugEnabled; + const bool debugEnabled = CudfSystemConfig::getInstance().debugEnabled(); // Cache "before" operator descriptions so we can print before/after together. std::vector> beforeOperators; @@ -80,7 +80,6 @@ bool CompileState::compile(bool allowCpuFallback) { bool replacementsMade = false; auto ctx = driver_.driverCtx(); - // Helper to check if planNodeId is valid (some operators like CallbackSink // have "N/A") auto isValidPlanNodeId = [](const core::PlanNodeId& id) { @@ -276,9 +275,7 @@ struct CudfDriverAdapter { // Call operator needed by DriverAdapter bool operator()(const exec::DriverFactory& factory, exec::Driver& driver) { - if (!driver.driverCtx()->queryConfig().get( - CudfConfig::kCudfEnabled, CudfConfig::getInstance().enabled) && - allowCpuFallback_) { + if (!CudfSystemConfig::getInstance().debugEnabled() && allowCpuFallback_) { return false; } auto state = CompileState(factory, driver); @@ -304,39 +301,40 @@ void registerCudf() { // Register operator adapters registerAllOperatorAdapters(); - auto prefix = CudfConfig::getInstance().functionNamePrefix; + auto& config = CudfSystemConfig::getInstance(); + + auto prefix = config.functionNamePrefix(); registerBuiltinFunctions(prefix); registerStepAwareBuiltinAggregationFunctions(prefix); CUDF_FUNC_RANGE(); cudaFree(nullptr); // Initialize CUDA context at startup - const std::string mrMode = CudfConfig::getInstance().memoryResource; auto mr = cudf_velox::createMemoryResource( - mrMode, CudfConfig::getInstance().memoryPercent); + config.memoryResource(), config.memoryPercent()); cudf::set_current_device_resource(mr.get()); mr_ = mr; - const auto& outputMrMode = CudfConfig::getInstance().outputMemoryResource; + const auto& outputMrMode = config.outputMemoryResource(); if (!outputMrMode.empty() && outputMrMode != mrMode) { - output_mr_ = cudf_velox::createMemoryResource( - outputMrMode, CudfConfig::getInstance().memoryPercent); + output_mr_ = + cudf_velox::createMemoryResource(outputMrMode, config.memoryPercent()); } else { output_mr_ = mr_; } exec::Operator::registerOperator( std::make_unique()); - CudfDriverAdapter cda{CudfConfig::getInstance().allowCpuFallback}; + CudfDriverAdapter cda{config.allowCpuFallback()}; exec::DriverAdapter cudfAdapter{kCudfAdapterName, {}, cda}; exec::DriverFactory::registerAdapter(cudfAdapter); - if (CudfConfig::getInstance().astExpressionEnabled) { - registerAstEvaluator(CudfConfig::getInstance().astExpressionPriority); + if (config.astExpressionEnabled()) { + registerAstEvaluator(config.astExpressionPriority()); } - if (CudfConfig::getInstance().jitExpressionEnabled) { - registerJitEvaluator(CudfConfig::getInstance().jitExpressionPriority); + if (config.jitExpressionEnabled()) { + registerJitEvaluator(config.jitExpressionPriority()); } isCudfRegistered = true; @@ -357,65 +355,4 @@ void unregisterCudf() { isCudfRegistered = false; } -CudfConfig& CudfConfig::getInstance() { - static CudfConfig instance; - return instance; -} - -void CudfConfig::initialize( - std::unordered_map&& config) { - if (config.find(kCudfEnabled) != config.end()) { - enabled = folly::to(config[kCudfEnabled]); - } - if (config.find(kCudfDebugEnabled) != config.end()) { - debugEnabled = folly::to(config[kCudfDebugEnabled]); - } - if (config.find(kCudfMemoryResource) != config.end()) { - memoryResource = config[kCudfMemoryResource]; - } - if (config.find(kCudfMemoryPercent) != config.end()) { - memoryPercent = folly::to(config[kCudfMemoryPercent]); - } - if (config.find(kCudfOutputMr) != config.end()) { - outputMemoryResource = config[kCudfOutputMr]; - } - if (config.find(kCudfBatchSizeMinThreshold) != config.end()) { - batchSizeMinThreshold = - folly::to(config[kCudfBatchSizeMinThreshold]); - } - if (config.find(kCudfBatchSizeMaxThreshold) != config.end()) { - batchSizeMaxThreshold = - folly::to(config[kCudfBatchSizeMaxThreshold]); - } - if (config.find(kCudfConcatOptimizationEnabled) != config.end()) { - concatOptimizationEnabled = - folly::to(config[kCudfConcatOptimizationEnabled]); - } - if (config.find(kCudfFunctionNamePrefix) != config.end()) { - functionNamePrefix = config[kCudfFunctionNamePrefix]; - } - if (config.find(kCudfAstExpressionEnabled) != config.end()) { - astExpressionEnabled = folly::to(config[kCudfAstExpressionEnabled]); - } - if (config.find(kCudfJitExpressionEnabled) != config.end()) { - jitExpressionEnabled = folly::to(config[kCudfJitExpressionEnabled]); - } - if (config.find(kCudfAstExpressionPriority) != config.end()) { - astExpressionPriority = - folly::to(config[kCudfAstExpressionPriority]); - } - if (config.find(kCudfAllowCpuFallback) != config.end()) { - allowCpuFallback = folly::to(config[kCudfAllowCpuFallback]); - } - if (config.find(kCudfLogFallback) != config.end()) { - logFallback = folly::to(config[kCudfLogFallback]); - } - if (config.find(kCudfTopNBatchSize) != config.end()) { - topNBatchSize = folly::to(config[kCudfTopNBatchSize]); - } - if (config.find(kCudfFunctionEngine) != config.end()) { - functionEngine = config[kCudfFunctionEngine]; - } -} - } // namespace facebook::velox::cudf_velox diff --git a/velox/experimental/cudf/exec/Utilities.cpp b/velox/experimental/cudf/exec/Utilities.cpp index 98c16d3791e..e8050afe82d 100644 --- a/velox/experimental/cudf/exec/Utilities.cpp +++ b/velox/experimental/cudf/exec/Utilities.cpp @@ -14,8 +14,8 @@ * limitations under the License. */ -#include "velox/experimental/cudf/CudfConfig.h" #include "velox/experimental/cudf/CudfNoDefaults.h" +#include "velox/experimental/cudf/common/CudfSystemConfig.h" #include "velox/experimental/cudf/exec/Utilities.h" #include "velox/experimental/cudf/exec/VeloxCudfInterop.h" @@ -137,8 +137,9 @@ std::vector> getConcatenatedTableBatched( std::vector> outputTables; const auto& cudfConfig = CudfConfig::getInstance(); - auto const maxRows = cudfConfig.batchSizeMaxThreshold - ? static_cast(cudfConfig.batchSizeMaxThreshold.value()) + const auto maxBatchSize = cudfConfig.batchSizeMaxThreshold(); + auto const maxRows = maxBatchSize + ? static_cast(maxBatchSize.value()) : static_cast(std::numeric_limits::max()); size_t startpos = 0; size_t runningRows = 0; diff --git a/velox/experimental/cudf/exec/Validation.h b/velox/experimental/cudf/exec/Validation.h index d5ac57d666f..ee5355b2c16 100644 --- a/velox/experimental/cudf/exec/Validation.h +++ b/velox/experimental/cudf/exec/Validation.h @@ -16,14 +16,14 @@ #pragma once -#include "velox/experimental/cudf/CudfConfig.h" +#include "velox/experimental/cudf/common/CudfSystemConfig.h" #include "velox/common/base/ExceptionHelper.h" namespace facebook::velox::cudf_velox { /// Log a reason for falling back to Velox CPU execution if -/// CudfConfig::getInstance().logFallback is enabled. It captures the +/// CudfSystemConfig::getInstance().logFallback is enabled. It captures the /// file name, line number, function name, and reason for the failure. The /// validation failure introduces extra data format conversion which can /// negatively impact the performance, so log the messgage to notify the @@ -37,7 +37,7 @@ namespace facebook::velox::cudf_velox { /// toType->toString()); #define LOG_FALLBACK(...) \ do { \ - if (CudfConfig::getInstance().logFallback) { \ + if (CudfSystemConfig::getInstance().logFallback()) { \ auto message = ::facebook::velox::errorMessage(__VA_ARGS__); \ LOG(WARNING) << fmt::format( \ "Validation failed at function: {}, reason: Operation is not supported in cuDF execution: {}", \ diff --git a/velox/experimental/cudf/expression/AstExpression.cpp b/velox/experimental/cudf/expression/AstExpression.cpp index 00693061be4..bcf89eda6f3 100644 --- a/velox/experimental/cudf/expression/AstExpression.cpp +++ b/velox/experimental/cudf/expression/AstExpression.cpp @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include "velox/experimental/cudf/CudfConfig.h" +#include "velox/experimental/cudf/common/CudfSystemConfig.h" #include "velox/experimental/cudf/exec/VeloxCudfInterop.h" #include "velox/experimental/cudf/expression/AstExpression.h" #include "velox/experimental/cudf/expression/AstExpressionUtils.h" @@ -105,7 +105,7 @@ ColumnOrView ASTExpression::eval( precomputedColumns[columnIndex - inputColumnViews.size()]); } } else { - if (CudfConfig::getInstance().debugEnabled) { + if (cudf_velox::CudfSystemConfig::getInstance().debugEnabled()) { LOG(INFO) << cudf::ast::expression_to_string(cudfTree_.back()); LOG(INFO) << cudf::table_schema_to_string(astInputTableView); } diff --git a/velox/experimental/cudf/expression/AstExpressionUtils.h b/velox/experimental/cudf/expression/AstExpressionUtils.h index 64fc5b0d1ce..85cc3e6aa0c 100644 --- a/velox/experimental/cudf/expression/AstExpressionUtils.h +++ b/velox/experimental/cudf/expression/AstExpressionUtils.h @@ -16,7 +16,7 @@ #pragma once -#include "velox/experimental/cudf/CudfConfig.h" +#include "velox/experimental/cudf/common/CudfSystemConfig.h" #include "velox/experimental/cudf/exec/GpuResources.h" #include "velox/experimental/cudf/exec/VeloxCudfInterop.h" #include "velox/experimental/cudf/expression/AstExpression.h" @@ -225,8 +225,8 @@ bool isAstExprSupported(const std::shared_ptr& expr) { using velox::exec::FieldReference; using Op = cudf::ast::ast_operator; - const auto name = - stripPrefix(expr->name(), CudfConfig::getInstance().functionNamePrefix); + const auto name = stripPrefix( + expr->name(), CudfSystemConfig::getInstance().functionNamePrefix()); const auto len = expr->inputs().size(); // Literals and field references are always supported @@ -417,8 +417,8 @@ cudf::ast::expression const& AstContext::multipleInputsToPairWise( const std::shared_ptr& expr) { using Operation = cudf::ast::operation; - const auto name = - stripPrefix(expr->name(), CudfConfig::getInstance().functionNamePrefix); + const auto name = stripPrefix( + expr->name(), CudfSystemConfig::getInstance().functionNamePrefix()); auto len = expr->inputs().size(); // Create a simple chain of operations auto result = &pushExprToTree(expr->inputs()[0]); @@ -443,8 +443,8 @@ cudf::ast::expression const& AstContext::pushExprToTree( using velox::exec::ConstantExpr; using velox::exec::FieldReference; - const auto name = - stripPrefix(expr->name(), CudfConfig::getInstance().functionNamePrefix); + const auto name = stripPrefix( + expr->name(), CudfSystemConfig::getInstance().functionNamePrefix()); auto len = expr->inputs().size(); auto& type = expr->type(); @@ -485,167 +485,168 @@ cudf::ast::expression const& AstContext::pushExprToTree( } else if (name == "isnotnull") { VELOX_CHECK_EQ(len, 1); auto const& op1 = pushExprToTree(expr->inputs()[0]); - auto const& nullOp = tree.push(Operation{Op::IS_NULL, op1}); - return tree.push(Operation{Op::NOT, nullOp}); + auto const& nullOp = tree.push(Operation{Op::IS_NULL, op1)); + return tree.push(Operation{Op::NOT, nullOp}); } else if (name == "between") { - VELOX_CHECK_EQ(len, 3); - auto const& value = pushExprToTree(expr->inputs()[0]); - auto const& lower = pushExprToTree(expr->inputs()[1]); - auto const& upper = pushExprToTree(expr->inputs()[2]); - // construct between(op2, op3) using >= and <= - auto const& geLower = tree.push(Operation{Op::GREATER_EQUAL, value, lower}); - auto const& leUpper = tree.push(Operation{Op::LESS_EQUAL, value, upper}); - return tree.push(Operation{Op::NULL_LOGICAL_AND, geLower, leUpper}); + VELOX_CHECK_EQ(len, 3); + auto const& value = pushExprToTree(expr->inputs()[0]); + auto const& lower = pushExprToTree(expr->inputs()[1]); + auto const& upper = pushExprToTree(expr->inputs()[2]); + // construct between(op2, op3) using >= and <= + auto const& geLower = + tree.push(Operation{Op::GREATER_EQUAL, value, lower}); + auto const& leUpper = tree.push(Operation{Op::LESS_EQUAL, value, upper}); + return tree.push(Operation{Op::NULL_LOGICAL_AND, geLower, leUpper}); } else if (name == "in") { - // number of inputs is variable. >=2 - VELOX_CHECK_EQ(len, 2); - // actually len is 2, second input is ARRAY - auto const& op1 = pushExprToTree(expr->inputs()[0]); - auto c = dynamic_cast(expr->inputs()[1].get()); - VELOX_CHECK_NOT_NULL(c, "literal expression should be ConstantExpr"); - auto value = c->value(); - VELOX_CHECK_NOT_NULL(value, "ConstantExpr value is null"); - - // Use the new createLiteralsFromArray function to get literals - auto literals = createLiteralsFromArray(value, scalars); - - // Create equality expressions for each literal and OR them together - std::vector exprVec; - for (auto& literal : literals) { - auto const& opi = tree.push(std::move(literal)); - auto const& logicalNode = tree.push(Operation{Op::EQUAL, op1, opi}); - exprVec.push_back(&logicalNode); - } + // number of inputs is variable. >=2 + VELOX_CHECK_EQ(len, 2); + // actually len is 2, second input is ARRAY + auto const& op1 = pushExprToTree(expr->inputs()[0]); + auto c = dynamic_cast(expr->inputs()[1].get()); + VELOX_CHECK_NOT_NULL(c, "literal expression should be ConstantExpr"); + auto value = c->value(); + VELOX_CHECK_NOT_NULL(value, "ConstantExpr value is null"); + + // Use the new createLiteralsFromArray function to get literals + auto literals = createLiteralsFromArray(value, scalars); + + // Create equality expressions for each literal and OR them together + std::vector exprVec; + for (auto& literal : literals) { + auto const& opi = tree.push(std::move(literal)); + auto const& logicalNode = tree.push(Operation{Op::EQUAL, op1, opi}); + exprVec.push_back(&logicalNode); + } - // Handle empty IN list case - if (exprVec.empty()) { - // FAIL - VELOX_FAIL("Empty IN list"); - // Return FALSE for empty IN list - // auto falseValue = std::make_shared>( - // value->pool(), 1, false, TypeKind::BOOLEAN, false); - // return tree.push(createLiteral(falseValue, scalars)); - } + // Handle empty IN list case + if (exprVec.empty()) { + // FAIL + VELOX_FAIL("Empty IN list"); + // Return FALSE for empty IN list + // auto falseValue = std::make_shared>( + // value->pool(), 1, false, TypeKind::BOOLEAN, false); + // return tree.push(createLiteral(falseValue, scalars)); + } - // OR all logical nodes - auto* result = exprVec[0]; - for (size_t i = 1; i < exprVec.size(); i++) { - auto const& treeNode = - tree.push(Operation{Op::NULL_LOGICAL_OR, *result, *exprVec[i]}); - result = &treeNode; - } - return *result; + // OR all logical nodes + auto* result = exprVec[0]; + for (size_t i = 1; i < exprVec.size(); i++) { + auto const& treeNode = + tree.push(Operation{Op::NULL_LOGICAL_OR, *result, *exprVec[i]}); + result = &treeNode; + } + return *result; } else if (name == "cast" || name == "try_cast") { - VELOX_CHECK_EQ(len, 1); - auto const& op1 = pushExprToTree(expr->inputs()[0]); - if (expr->type()->kind() == TypeKind::INTEGER) { - // No int32 cast in cudf ast - return tree.push(Operation{Op::CAST_TO_INT64, op1}); - } else if (expr->type()->kind() == TypeKind::BIGINT) { - return tree.push(Operation{Op::CAST_TO_INT64, op1}); - } else if (expr->type()->kind() == TypeKind::DOUBLE) { - return tree.push(Operation{Op::CAST_TO_FLOAT64, op1}); - } else { - VELOX_FAIL("Unsupported type for cast operation"); - } + VELOX_CHECK_EQ(len, 1); + auto const& op1 = pushExprToTree(expr->inputs()[0]); + if (expr->type()->kind() == TypeKind::INTEGER) { + // No int32 cast in cudf ast + return tree.push(Operation{Op::CAST_TO_INT64, op1}); + } else if (expr->type()->kind() == TypeKind::BIGINT) { + return tree.push(Operation{Op::CAST_TO_INT64, op1}); + } else if (expr->type()->kind() == TypeKind::DOUBLE) { + return tree.push(Operation{Op::CAST_TO_FLOAT64, op1}); + } else { + VELOX_FAIL("Unsupported type for cast operation"); + } } else if (auto fieldExpr = std::dynamic_pointer_cast(expr)) { - // Refer to the appropriate side - const auto fieldName = - fieldExpr->inputs().empty() ? name : fieldExpr->inputs()[0]->name(); - for (size_t sideIdx = 0; sideIdx < inputRowSchema.size(); ++sideIdx) { - auto& schema = inputRowSchema[sideIdx]; - if (schema.get()->containsChild(fieldName)) { - auto columnIndex = schema.get()->getChildIdx(fieldName); - // This column may be complex data type like ROW, we need to get the - // name from row. Push fieldName.name to the tree. - auto side = static_cast(sideIdx); - if (fieldExpr->field() == fieldName) { - return tree.push(cudf::ast::column_reference(columnIndex, side)); - } else if (!allowPureAstOnly) { - return addPrecomputeInstruction( - fieldName, "nested_column", fieldExpr->field()); - } else { - VELOX_FAIL("Unsupported type for nested column operation"); + // Refer to the appropriate side + const auto fieldName = + fieldExpr->inputs().empty() ? name : fieldExpr->inputs()[0]->name(); + for (size_t sideIdx = 0; sideIdx < inputRowSchema.size(); ++sideIdx) { + auto& schema = inputRowSchema[sideIdx]; + if (schema.get()->containsChild(fieldName)) { + auto columnIndex = schema.get()->getChildIdx(fieldName); + // This column may be complex data type like ROW, we need to get the + // name from row. Push fieldName.name to the tree. + auto side = static_cast(sideIdx); + if (fieldExpr->field() == fieldName) { + return tree.push(cudf::ast::column_reference(columnIndex, side)); + } else if (!allowPureAstOnly) { + return addPrecomputeInstruction( + fieldName, "nested_column", fieldExpr->field()); + } else { + VELOX_FAIL("Unsupported type for nested column operation"); + } } } - } - VELOX_FAIL("Field not found, " + name); + VELOX_FAIL("Field not found, " + name); } else if (!allowPureAstOnly && canBeEvaluatedByCudf(expr, /*deep=*/false)) { - // Shallow check: only verify this operation is supported - // Children will be recursively handled by createCudfExpression - // Determine which side this expression references - int sideIdx = findExpressionSide(expr); - if (sideIdx < 0) { - sideIdx = 0; // Default to left side if no fields found - } - auto node = - createCudfExpression(expr, inputRowSchema[sideIdx], kAstEvaluatorName); - return addPrecomputeInstructionOnSide(sideIdx, 0, name, "", node); + // Shallow check: only verify this operation is supported + // Children will be recursively handled by createCudfExpression + // Determine which side this expression references + int sideIdx = findExpressionSide(expr); + if (sideIdx < 0) { + sideIdx = 0; // Default to left side if no fields found + } + auto node = createCudfExpression( + expr, inputRowSchema[sideIdx], kAstEvaluatorName); + return addPrecomputeInstructionOnSide(sideIdx, 0, name, "", node); } else { - VELOX_FAIL("Unsupported expression: " + name); + VELOX_FAIL("Unsupported expression: " + name); + } } -} -int AstContext::findExpressionSide( - const std::shared_ptr& expr) const { - for (const auto* field : expr->distinctFields()) { - for (size_t sideIdx = 0; sideIdx < inputRowSchema.size(); ++sideIdx) { - if (inputRowSchema[sideIdx].get()->containsChild(field->field())) { - return static_cast(sideIdx); + int AstContext::findExpressionSide( + const std::shared_ptr& expr) const { + for (const auto* field : expr->distinctFields()) { + for (size_t sideIdx = 0; sideIdx < inputRowSchema.size(); ++sideIdx) { + if (inputRowSchema[sideIdx].get()->containsChild(field->field())) { + return static_cast(sideIdx); + } } } + return -1; } - return -1; -} -std::vector precomputeSubexpressions( - const std::vector& inputColumnViews, - const std::vector& precomputeInstructions, - const std::vector>& scalars, - const RowTypePtr& inputRowSchema, - rmm::cuda_stream_view stream) { - std::vector precomputedColumns; - precomputedColumns.reserve(precomputeInstructions.size()); - - for (const auto& instruction : precomputeInstructions) { - auto - [dependent_column_index, - ins_name, - new_column_index, - nested_dependent_column_indices, - cudf_expression] = instruction; - - // If a compiled cudf node is available, evaluate it directly. - if (cudf_expression) { - auto result = cudf_expression->eval( - inputColumnViews, - stream, - get_output_mr(), - /*finalize=*/true); - precomputedColumns.push_back(std::move(result)); - continue; - } - if (ins_name.rfind("fill", 0) == 0) { - auto scalarIndex = - std::stoi(ins_name.substr(5)); // "fill " is 5 characters - auto newColumn = cudf::make_column_from_scalar( - *scalars[scalarIndex], - inputColumnViews[dependent_column_index].size(), - stream, - get_output_mr()); - precomputedColumns.push_back(std::move(newColumn)); - } else if (ins_name == "nested_column") { - // Nested column already exists in input. Don't materialize. - auto view = inputColumnViews[dependent_column_index].child( - nested_dependent_column_indices[0]); - precomputedColumns.push_back(view); - } else { - VELOX_FAIL("Unsupported precompute operation " + ins_name); + std::vector precomputeSubexpressions( + const std::vector& inputColumnViews, + const std::vector& precomputeInstructions, + const std::vector>& scalars, + const RowTypePtr& inputRowSchema, + rmm::cuda_stream_view stream) { + std::vector precomputedColumns; + precomputedColumns.reserve(precomputeInstructions.size()); + + for (const auto& instruction : precomputeInstructions) { + auto + [dependent_column_index, + ins_name, + new_column_index, + nested_dependent_column_indices, + cudf_expression] = instruction; + + // If a compiled cudf node is available, evaluate it directly. + if (cudf_expression) { + auto result = cudf_expression->eval( + inputColumnViews, + stream, + get_output_mr(), + /*finalize=*/true); + precomputedColumns.push_back(std::move(result)); + continue; + } + if (ins_name.rfind("fill", 0) == 0) { + auto scalarIndex = + std::stoi(ins_name.substr(5)); // "fill " is 5 characters + auto newColumn = cudf::make_column_from_scalar( + *scalars[scalarIndex], + inputColumnViews[dependent_column_index].size(), + stream, + get_output_mr()); + precomputedColumns.push_back(std::move(newColumn)); + } else if (ins_name == "nested_column") { + // Nested column already exists in input. Don't materialize. + auto view = inputColumnViews[dependent_column_index].child( + nested_dependent_column_indices[0]); + precomputedColumns.push_back(view); + } else { + VELOX_FAIL("Unsupported precompute operation " + ins_name); + } } - } - return precomputedColumns; -} + return precomputedColumns; + } } // namespace } // namespace facebook::velox::cudf_velox diff --git a/velox/experimental/cudf/expression/ExpressionEvaluator.cpp b/velox/experimental/cudf/expression/ExpressionEvaluator.cpp index 1c0e73c4c89..2e4a56f7483 100644 --- a/velox/experimental/cudf/expression/ExpressionEvaluator.cpp +++ b/velox/experimental/cudf/expression/ExpressionEvaluator.cpp @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include "velox/experimental/cudf/common/CudfSystemConfig.h" #include "velox/experimental/cudf/exec/Validation.h" #include "velox/experimental/cudf/exec/VeloxCudfInterop.h" #include "velox/experimental/cudf/expression/AstUtils.h" @@ -1121,7 +1122,7 @@ bool registerBuiltinFunctions(const std::string& prefix) { // Cast needs special handling dynamically using cudf. }); - if (CudfConfig::getInstance().functionEngine == "spark") { + if (CudfSystemConfig::getInstance().functionEngine() == "spark") { registerSparkFunctions(prefix); } else { registerPrestoFunctions(prefix); diff --git a/velox/experimental/cudf/tests/AggregationTest.cpp b/velox/experimental/cudf/tests/AggregationTest.cpp index a16c2d26d34..897b67deb4f 100644 --- a/velox/experimental/cudf/tests/AggregationTest.cpp +++ b/velox/experimental/cudf/tests/AggregationTest.cpp @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "velox/experimental/cudf/CudfConfig.h" +#include "velox/experimental/cudf/common/CudfSystemConfig.h" #include "velox/experimental/cudf/exec/ToCudf.h" #include "velox/dwio/common/tests/utils/BatchMaker.h" @@ -41,7 +41,8 @@ class AggregationTest : public OperatorTestBase { void SetUp() override { OperatorTestBase::SetUp(); filesystems::registerLocalFileSystem(); - cudf_velox::CudfConfig::getInstance().allowCpuFallback = false; + cudf_velox::CudfSystemConfig::getInstance().updateConfigs( + {{cudf_velox::CudfSystemConfig::kCudfAllowCpuFallback, "false"}}); cudf_velox::registerCudf(); } diff --git a/velox/experimental/cudf/tests/BatchConcatTest.cpp b/velox/experimental/cudf/tests/BatchConcatTest.cpp index ed19de4bb8e..5e891c28e37 100644 --- a/velox/experimental/cudf/tests/BatchConcatTest.cpp +++ b/velox/experimental/cudf/tests/BatchConcatTest.cpp @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "velox/experimental/cudf/CudfConfig.h" +#include "velox/experimental/cudf/common/CudfSystemConfig.h" #include "velox/experimental/cudf/exec/ToCudf.h" #include "velox/exec/PlanNodeStats.h" @@ -31,20 +31,24 @@ class CudfBatchConcatTest : public OperatorTestBase { protected: void SetUp() override { OperatorTestBase::SetUp(); - CudfConfig::getInstance().debugEnabled = true; + CudfSystemConfig::getInstance().set( + CudfSystemConfig::kCudfDebugEnabled, "true"); cudf_velox::registerCudf(); } void TearDown() override { - CudfConfig::getInstance().concatOptimizationEnabled = false; + CudfSystemConfig::getInstance().set( + CudfSystemConfig::kCudfConcatOptimizationEnabled, "false"); cudf_velox::unregisterCudf(); OperatorTestBase::TearDown(); } void updateCudfConfig(int32_t min, std::optional max) { - auto& config = CudfConfig::getInstance(); - config.batchSizeMinThreshold = min; - config.batchSizeMaxThreshold = max; + auto& config = CudfSystemConfig::getInstance(); + config.set( + CudfSystemConfig::kCudfBatchSizeMinThreshold, std::to_string(min)); + config.set( + CudfSystemConfig::kCudfBatchSizeMaxThreshold, std::to_string(max)); } template @@ -89,7 +93,8 @@ TEST_F(CudfBatchConcatTest, concatReducesBatchesBeforeAggregation) { // With min threshold 30, concat should accumulate ~3 batches before flushing, // producing fewer output batches than the 6 it received. updateCudfConfig(/*min=*/30, /*max=*/std::nullopt); - CudfConfig::getInstance().concatOptimizationEnabled = true; + CudfSystemConfig::getInstance().set( + CudfSystemConfig::kCudfConcatOptimizationEnabled, "true"); std::vector vectors; for (int i = 0; i < 6; ++i) { @@ -130,7 +135,8 @@ TEST_F(CudfBatchConcatTest, concatReducesBatchesBeforeAggregation) { // disabled, even when aggregation is present. TEST_F(CudfBatchConcatTest, concatNotInsertedWhenDisabled) { updateCudfConfig(/*min=*/30, /*max=*/std::nullopt); - CudfConfig::getInstance().concatOptimizationEnabled = false; + CudfSystemConfig::getInstance().set( + CudfSystemConfig::kCudfConcatOptimizationEnabled, "false"); std::vector vectors; for (int i = 0; i < 6; ++i) { @@ -164,7 +170,8 @@ TEST_F(CudfBatchConcatTest, concatNotInsertedWhenDisabled) { // and flushes them as a single merged batch on noMoreInput. TEST_F(CudfBatchConcatTest, concatMergesAllOnFlushWithHighThreshold) { updateCudfConfig(/*min=*/100000, /*max=*/std::nullopt); - CudfConfig::getInstance().concatOptimizationEnabled = true; + CudfSystemConfig::getInstance().set( + CudfSystemConfig::kCudfConcatOptimizationEnabled, "true"); std::vector vectors; for (int i = 0; i < 6; ++i) { @@ -203,7 +210,8 @@ TEST_F(CudfBatchConcatTest, concatMergesAllOnFlushWithHighThreshold) { // Verifies correctness with grouped aggregation (non-global) and concat. TEST_F(CudfBatchConcatTest, concatWithGroupedAggregation) { updateCudfConfig(/*min=*/30, /*max=*/std::nullopt); - CudfConfig::getInstance().concatOptimizationEnabled = true; + CudfSystemConfig::getInstance().set( + CudfSystemConfig::kCudfConcatOptimizationEnabled, "true"); std::vector vectors; for (int i = 0; i < 6; ++i) { diff --git a/velox/experimental/cudf/tests/ConfigTest.cpp b/velox/experimental/cudf/tests/ConfigTest.cpp index f8344ffd709..ef247ead56e 100644 --- a/velox/experimental/cudf/tests/ConfigTest.cpp +++ b/velox/experimental/cudf/tests/ConfigTest.cpp @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "velox/experimental/cudf/CudfConfig.h" +#include "velox/experimental/cudf/common/CudfSystemConfig.h" #include @@ -22,20 +22,32 @@ namespace facebook::velox::cudf_velox::test { TEST(ConfigTest, CudfConfig) { std::unordered_map options = { - {CudfConfig::kCudfEnabled, "false"}, - {CudfConfig::kCudfDebugEnabled, "true"}, - {CudfConfig::kCudfMemoryResource, "arena"}, - {CudfConfig::kCudfMemoryPercent, "25"}, - {CudfConfig::kCudfFunctionNamePrefix, "presto"}, - {CudfConfig::kCudfAllowCpuFallback, "false"}}; + {CudfSystemConfig::kCudfAllowCpuFallback, "false"}, + {CudfSystemConfig::kCudfDebugEnabled, "true"}, + {CudfSystemConfig::kCudfLogFallback, "true"}, + {CudfSystemConfig::kCudfMemoryResource, "arena"}, + {CudfSystemConfig::kCudfMemoryPercent, "25"}, + {CudfSystemConfig::kCudfFunctionNamePrefix, "presto"}, + {CudfSystemConfig::kCudfEnabled, "false"}, + {CudfSystemConfig::kCudfAllowCpuFallback, "false"}, + {CudfSystemConfig::kCudfTopNBatchSize, "10"}}; - CudfConfig config; - config.initialize(std::move(options)); - ASSERT_EQ(config.enabled, false); - ASSERT_EQ(config.debugEnabled, true); - ASSERT_EQ(config.memoryResource, "arena"); - ASSERT_EQ(config.memoryPercent, 25); - ASSERT_EQ(config.functionNamePrefix, "presto"); - ASSERT_EQ(config.allowCpuFallback, false); + CudfSystemConfig config(std::move(options)); + config.updateConfigs(std::move(options)); + ASSERT_EQ(config.cudfEnabled(), false); + ASSERT_EQ(config.memoryResource(), "arena"); + ASSERT_EQ(config.memoryPercent(), 25); + ASSERT_EQ(config.functionNamePrefix(), "presto"); + ASSERT_EQ(config.allowCpuFallback(), false); + ASSERT_EQ(config.debugEnabled(), true); + ASSERT_EQ(config.logFallback(), true); + ASSERT_EQ(config.topNBatchSize(), 10); +} + +TEST(ConfigTest, CudfSystemConfigLegacyUnderscoreKeys) { + CudfSystemConfig config; + config.updateConfigs({{"cudf.allow_cpu_fallback", "false"}}); + + ASSERT_EQ(config.allowCpuFallback(), false); } } // namespace facebook::velox::cudf_velox::test diff --git a/velox/experimental/cudf/tests/ExpressionEvaluatorSelectionTest.cpp b/velox/experimental/cudf/tests/ExpressionEvaluatorSelectionTest.cpp index cb3c19dee4b..b6d4890d0ec 100644 --- a/velox/experimental/cudf/tests/ExpressionEvaluatorSelectionTest.cpp +++ b/velox/experimental/cudf/tests/ExpressionEvaluatorSelectionTest.cpp @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "velox/experimental/cudf/CudfConfig.h" +#include "velox/experimental/cudf/common/CudfSystemConfig.h" #include "velox/experimental/cudf/exec/ToCudf.h" #include "velox/experimental/cudf/expression/AstExpression.h" #include "velox/experimental/cudf/expression/ExpressionEvaluator.h" @@ -48,7 +48,8 @@ class CudfExpressionSelectionTest : public ::testing::Test { pool_ = memory::memoryManager()->addLeafPool("", false); queryCtx_ = core::QueryCtx::create(); execCtx_ = std::make_unique(pool_.get(), queryCtx_.get()); - CudfConfig::getInstance().functionEngine = "spark"; + CudfSystemConfig::getInstance().set( + CudfSystemConfig::kCudfFunctionEngine, "spark"); cudf_velox::registerCudf(); rowType_ = ROW({ {"a", BIGINT()}, @@ -76,17 +77,21 @@ class CudfExpressionSelectionTest : public ::testing::Test { }; TEST_F(CudfExpressionSelectionTest, astRoot) { - auto prevAst = CudfConfig::getInstance().astExpressionEnabled; - auto prevJit = CudfConfig::getInstance().jitExpressionEnabled; - CudfConfig::getInstance().astExpressionEnabled = true; - CudfConfig::getInstance().jitExpressionEnabled = true; + auto prevAst = CudfSystemConfig::getInstance().astExpressionEnabled(); + auto prevJit = CudfSystemConfig::getInstance().jitExpressionEnabled(); + CudfSystemConfig::getInstance().updateConfigs( + {{CudfSystemConfig::kCudfAstExpressionEnabled, "true"}, + {CudfSystemConfig::kCudfJitExpressionEnabled, "true"}}); auto expr = compileExecExpr("a + c", rowType_, execCtx_.get()); auto cudfExpr = createCudfExpression(expr, rowType_); auto* ast = dynamic_cast(cudfExpr.get()); auto* jit = dynamic_cast(cudfExpr.get()); ASSERT_TRUE(ast != nullptr || jit != nullptr); - CudfConfig::getInstance().astExpressionEnabled = prevAst; - CudfConfig::getInstance().jitExpressionEnabled = prevJit; + CudfSystemConfig::getInstance().updateConfigs( + {{CudfSystemConfig::kCudfAstExpressionEnabled, + prevAst ? "true" : "false"}, + {CudfSystemConfig::kCudfJitExpressionEnabled, + prevJit ? "true" : "false"}}); } TEST_F(CudfExpressionSelectionTest, functionRoot) { @@ -98,10 +103,11 @@ TEST_F(CudfExpressionSelectionTest, functionRoot) { } TEST_F(CudfExpressionSelectionTest, astTopLevelWithFunctionPrecompute) { - auto prevAst = CudfConfig::getInstance().astExpressionEnabled; - auto prevJit = CudfConfig::getInstance().jitExpressionEnabled; - CudfConfig::getInstance().astExpressionEnabled = true; - CudfConfig::getInstance().jitExpressionEnabled = true; + auto prevAst = CudfSystemConfig::getInstance().astExpressionEnabled(); + auto prevJit = CudfSystemConfig::getInstance().jitExpressionEnabled(); + CudfSystemConfig::getInstance().updateConfigs( + {{CudfSystemConfig::kCudfAstExpressionEnabled, "true"}, + {CudfSystemConfig::kCudfJitExpressionEnabled, "true"}}); auto expr = compileExecExpr( "(year(date) > 2020) AND (length(name) < 10)", rowType_, execCtx_.get()); ASSERT_TRUE(canBeEvaluatedByCudf(expr, /*deep=*/false)); @@ -109,8 +115,11 @@ TEST_F(CudfExpressionSelectionTest, astTopLevelWithFunctionPrecompute) { auto* ast = dynamic_cast(cudfExpr.get()); auto* jit = dynamic_cast(cudfExpr.get()); ASSERT_TRUE(ast != nullptr || jit != nullptr); - CudfConfig::getInstance().astExpressionEnabled = prevAst; - CudfConfig::getInstance().jitExpressionEnabled = prevJit; + CudfSystemConfig::getInstance().updateConfigs( + {{CudfSystemConfig::kCudfAstExpressionEnabled, + prevAst ? "true" : "false"}, + {CudfSystemConfig::kCudfJitExpressionEnabled, + prevJit ? "true" : "false"}}); } TEST_F(CudfExpressionSelectionTest, functionTopLevelWithNestedFunction) { @@ -248,7 +257,8 @@ TEST_F(CudfExpressionSelectionTest, DISABLED_castAndTryCast) { // TODO (dm): This is required for passing of castAndTryCast test but breaks // others. This is because ASTExpr agrees to support bad casts. remove after // ASTExpr checks cast types - // CudfConfig::getInstance().astExpressionEnabled = false; + // CudfSystemConfig::getInstance().set( + // CudfSystemConfig::kCudfAstExpressionEnabled, "false"); // OK: cast bigint -> double (supported by cuDF) auto okCast = compileExecExpr("cast(a AS double)", rowType_, execCtx_.get()); diff --git a/velox/experimental/cudf/tests/FilterProjectTest.cpp b/velox/experimental/cudf/tests/FilterProjectTest.cpp index 3b43fc84427..ba0df195ee5 100644 --- a/velox/experimental/cudf/tests/FilterProjectTest.cpp +++ b/velox/experimental/cudf/tests/FilterProjectTest.cpp @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include "velox/experimental/cudf/CudfConfig.h" +#include "velox/experimental/cudf/common/CudfSystemConfig.h" #include "velox/experimental/cudf/exec/ToCudf.h" #include "velox/experimental/cudf/tests/CudfFunctionBaseTest.h" @@ -43,7 +43,8 @@ class CudfFilterProjectTest : public OperatorTestBase { void SetUp() override { OperatorTestBase::SetUp(); filesystems::registerLocalFileSystem(); - cudf_velox::CudfConfig::getInstance().allowCpuFallback = false; + cudf_velox::CudfSystemConfig::getInstance().set( + cudf_velox::CudfSystemConfig::kCudfAllowCpuFallback, "false"); cudf_velox::registerCudf(); rng_.seed(123); diff --git a/velox/experimental/cudf/tests/HashJoinTest.cpp b/velox/experimental/cudf/tests/HashJoinTest.cpp index 8af0211e7d4..41fbda550bf 100644 --- a/velox/experimental/cudf/tests/HashJoinTest.cpp +++ b/velox/experimental/cudf/tests/HashJoinTest.cpp @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "velox/experimental/cudf/CudfConfig.h" +#include "velox/experimental/cudf/common/CudfSystemConfig.h" #include "velox/experimental/cudf/exec/ToCudf.h" #include "folly/experimental/EventCount.h" @@ -56,7 +56,8 @@ class HashJoinTest : public HashJoinTestBase { void SetUp() override { HashJoinTestBase::SetUp(); - cudf_velox::CudfConfig::getInstance().allowCpuFallback = false; + cudf_velox::CudfSystemConfig::getInstance().set( + cudf_velox::CudfSystemConfig::kCudfAllowCpuFallback, "false"); cudf_velox::registerCudf(); } diff --git a/velox/experimental/cudf/tests/TopNTest.cpp b/velox/experimental/cudf/tests/TopNTest.cpp index 6441e7145dc..65604557ab0 100644 --- a/velox/experimental/cudf/tests/TopNTest.cpp +++ b/velox/experimental/cudf/tests/TopNTest.cpp @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include "velox/experimental/cudf/CudfConfig.h" +#include "velox/experimental/cudf/common/CudfSystemConfig.h" #include "velox/experimental/cudf/exec/CudfConversion.h" #include "velox/experimental/cudf/exec/ToCudf.h" @@ -284,7 +284,7 @@ TEST_F(TopNTest, numericTopNSynchronization) { // sync. AssertQueryBuilder(plan, duckDbQueryRunner_) .config(cudf_velox::CudfFromVelox::kGpuBatchSizeRows, batchSize) - .config(cudf_velox::CudfConfig::kCudfTopNBatchSize, 1) + .config(cudf_velox::CudfSystemConfig::kCudfTopNBatchSize, 1) .assertResults( fmt::format( "SELECT c0, c1, c2 FROM tmp ORDER BY c1 DESC, c0 LIMIT {}", diff --git a/velox/experimental/cudf/tests/sparksql/FilterProjectTest.cpp b/velox/experimental/cudf/tests/sparksql/FilterProjectTest.cpp index 327c42418b5..e0700a3323d 100644 --- a/velox/experimental/cudf/tests/sparksql/FilterProjectTest.cpp +++ b/velox/experimental/cudf/tests/sparksql/FilterProjectTest.cpp @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "velox/experimental/cudf/CudfConfig.h" +#include "velox/experimental/cudf/common/CudfSystemConfig.h" #include "velox/experimental/cudf/exec/CudfFilterProject.h" #include "velox/experimental/cudf/exec/ToCudf.h" #include "velox/experimental/cudf/tests/CudfFunctionBaseTest.h" @@ -39,7 +39,8 @@ class CudfFilterProjectTest : public CudfFunctionBaseTest { parse::registerTypeResolver(); functions::sparksql::registerFunctions(""); memory::MemoryManager::testingSetInstance(memory::MemoryManager::Options{}); - CudfConfig::getInstance().functionEngine = "spark"; + CudfSystemConfig::getInstance().updateConfigs( + {{CudfSystemConfig::kCudfFunctionEngine, "spark"}}); cudf_velox::registerCudf(); }