Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ jobs:
github.event_name == 'schedule' || needs.changes.outputs.codechange == 'true'
run: |
export PRESTO_SERVER_PATH="${GITHUB_WORKSPACE}/presto-native-execution/_build/release/presto_cpp/main/presto_server"
export TESTFILES=`find ./presto-native-tests/src/test -type f -name 'Test*.java'`
export TESTFILES=`find ./presto-native-tests/src/test -type f -name 'Test*.java' ! -path "*/presto-native-tests/src/test/java/com/facebook/presto/nativetests/cudf/*"`
# Convert file paths to comma separated class names
export TESTCLASSES=
for test_file in $TESTFILES
Expand Down
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[submodule "presto-native-execution/velox"]
path = presto-native-execution/velox
url = https://github.com/facebookincubator/velox.git
url = https://github.com/pramodsatya/velox.git
10 changes: 5 additions & 5 deletions presto-native-execution/presto_cpp/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@ add_subdirectory(thrift)
add_subdirectory(connectors)
add_subdirectory(functions)
add_subdirectory(tool)

add_library(presto_session_properties SessionProperties.cpp)

target_link_libraries(presto_session_properties ${FOLLY_WITH_DEPENDENCIES})
add_subdirectory(properties)

add_library(
presto_server_lib
Expand Down Expand Up @@ -66,7 +63,6 @@ target_link_libraries(
presto_connectors
presto_http
presto_operators
presto_session_properties
presto_velox_plan_conversion
presto_hive_functions
presto_theta_sketch_functions
Expand Down Expand Up @@ -112,6 +108,10 @@ target_link_libraries(
pthread
)

# presto_cudf_session_properties is always linked: velox_cudf_config is plain
# C++ (no CUDA), enabling the sidecar to expose cuDF session properties.
target_link_libraries(presto_server_lib presto_cudf_session_properties)

if(PRESTO_ENABLE_CUDF)
target_link_libraries(presto_server_lib velox_cudf_exec)
endif()
Expand Down
38 changes: 26 additions & 12 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include "presto_cpp/main/CoordinatorDiscoverer.h"
#include "presto_cpp/main/PeriodicMemoryChecker.h"
#include "presto_cpp/main/PeriodicTaskManager.h"
#include "presto_cpp/main/SessionProperties.h"
#include "presto_cpp/main/SignalHandler.h"
#include "presto_cpp/main/TaskResource.h"
#include "presto_cpp/main/common/ConfigReader.h"
Expand All @@ -44,6 +43,7 @@
#include "presto_cpp/main/operators/ShuffleExchangeSource.h"
#include "presto_cpp/main/operators/ShuffleRead.h"
#include "presto_cpp/main/operators/ShuffleWrite.h"
#include "presto_cpp/main/properties/session/SessionProperties.h"
#include "presto_cpp/main/types/ExpressionOptimizer.h"
#include "presto_cpp/main/types/PrestoToVeloxQueryPlan.h"
#include "presto_cpp/main/types/VeloxPlanConversion.h"
Expand Down Expand Up @@ -76,8 +76,11 @@
#include "velox/serializers/PrestoSerializer.h"
#include "velox/serializers/UnsafeRowSerializer.h"

// CudfSessionProperties and CudfConfig are plain C++ (no CUDA required).
// Always include them so the sidecar session endpoint exposes cuDF properties.
#include "presto_cpp/main/properties/session/CudfSessionProperties.h"
#include "velox/experimental/cudf/common/CudfConfig.h"
#ifdef PRESTO_ENABLE_CUDF
#include "velox/experimental/cudf/CudfConfig.h"
#include "velox/experimental/cudf/exec/ToCudf.h"
#endif

Expand Down Expand Up @@ -172,16 +175,15 @@ bool isSharedLibrary(const fs::path& path) {

void registerVeloxCudf() {
#ifdef PRESTO_ENABLE_CUDF
// Disable by default.
velox::cudf_velox::CudfConfig::getInstance().enabled = false;
auto systemConfig = SystemConfig::instance();
velox::cudf_velox::CudfConfig::getInstance().functionNamePrefix =
systemConfig->prestoDefaultNamespacePrefix();
if (systemConfig->values().contains(
velox::cudf_velox::CudfConfig::kCudfEnabled)) {
velox::cudf_velox::CudfConfig::getInstance().initialize(
velox::cudf_velox::CudfSystemConfig::kCudfEnabledEntry.name)) {
velox::cudf_velox::CudfSystemConfig::getInstance().updateConfigs(
systemConfig->values());
if (velox::cudf_velox::CudfConfig::getInstance().enabled) {
/// TODO(ps): Deprecate cudf.enabled from query config.
if (velox::cudf_velox::CudfSystemConfig::getInstance().cudfEnabled() ||
velox::cudf_velox::CudfQueryConfig::getInstance().get<bool>(
velox::cudf_velox::CudfQueryConfig::kCudfEnabledEntry)) {
velox::cudf_velox::registerCudf();
PRESTO_STARTUP_LOG(INFO) << "cuDF is registered.";
}
Expand All @@ -193,8 +195,9 @@ void unregisterVeloxCudf() {
#ifdef PRESTO_ENABLE_CUDF
auto systemConfig = SystemConfig::instance();
if (systemConfig->values().contains(
velox::cudf_velox::CudfConfig::kCudfEnabled) &&
velox::cudf_velox::CudfConfig::getInstance().enabled) {
velox::cudf_velox::CudfSystemConfig::kCudfEnabledEntry.name) &&
velox::cudf_velox::CudfSystemConfig::getInstance().get<bool>(
velox::cudf_velox::CudfSystemConfig::kCudfEnabledEntry)) {
velox::cudf_velox::unregisterCudf();
PRESTO_SHUTDOWN_LOG(INFO) << "cuDF is unregistered.";
}
Expand Down Expand Up @@ -419,6 +422,15 @@ void PrestoServer::initializeConfigs() {
nodeLocation_ = nodeConfig->nodeLocation();
nodePoolType_ = systemConfig->poolType();
prestoBuiltinFunctionPrefix_ = systemConfig->prestoDefaultNamespacePrefix();

#ifdef PRESTO_ENABLE_CUDF
// Set default values for cuDF in system config
velox::cudf_velox::CudfSystemConfig::getInstance().set(
velox::cudf_velox::CudfSystemConfig::kCudfEnabledEntry.name, "false");
velox::cudf_velox::CudfSystemConfig::getInstance().set(
velox::cudf_velox::CudfSystemConfig::kCudfFunctionNamePrefixEntry.name,
prestoBuiltinFunctionPrefix_);
#endif
} catch (const velox::VeloxUserError& e) {
PRESTO_STARTUP_LOG(ERROR) << "Failed to start server due to " << e.what();
exit(EXIT_FAILURE);
Expand Down Expand Up @@ -1830,9 +1842,11 @@ void PrestoServer::registerSidecarEndpoints() {
proxygen::HTTPMessage* /*message*/,
const std::vector<std::unique_ptr<folly::IOBuf>>& /*body*/,
proxygen::ResponseHandler* downstream) {
const auto* sessionProperties = SessionProperties::instance();
const auto* sessionProperties =
facebook::presto::cudf::CudfSessionProperties::instance();
http::sendOkResponse(downstream, sessionProperties->serialize());
});

httpServer_->registerGet(
"/v1/functions",
[](proxygen::HTTPMessage* /*message*/,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
*/

#include "presto_cpp/main/PrestoToVeloxQueryConfig.h"
#include "presto_cpp/main/SessionProperties.h"
#include <glog/logging.h>
#include "presto_cpp/main/common/Configs.h"
#include "presto_cpp/main/properties/session/CudfSessionProperties.h"
#include "presto_cpp/main/properties/session/SessionProperties.h"
#include "velox/common/compression/Compression.h"
#include "velox/core/QueryConfig.h"
#include "velox/experimental/cudf/common/CudfConfig.h"
#include "velox/type/tz/TimeZoneMap.h"

namespace facebook::presto {
Expand Down Expand Up @@ -239,8 +242,30 @@ void updateFromSystemConfigs(
}
}
}

} // namespace

velox::cudf_velox::CudfQueryConfig toCudfConfigs(
const protocol::SessionRepresentation& session) {
std::unordered_map<std::string, std::string> cudfConfigs;
auto* cudfSessionProperties = cudf::CudfSessionProperties::instance();

// Iterate through session properties and extract cuDF configs
for (const auto& [sessionPropName, sessionPropValue] :
session.systemProperties) {
// Use toVeloxConfig to get the mapped Velox config name
// CudfConfig::updateConfigs() will only process keys it recognizes
const auto veloxConfigName =
cudfSessionProperties->toVeloxConfig(sessionPropName);

if (!veloxConfigName.empty()) {
cudfConfigs[veloxConfigName] = sessionPropValue;
}
}

return velox::cudf_velox::CudfQueryConfig(std::move(cudfConfigs));
}

std::unordered_map<std::string, std::string> toVeloxConfigs(
const protocol::SessionRepresentation& session) {
std::unordered_map<std::string, std::string> configs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,21 @@ namespace facebook::velox::core {
class QueryConfig;
}

namespace facebook::velox::cudf_velox {
class CudfQueryConfig;
}

namespace facebook::presto {

/// Translates Presto configs to Velox 'QueryConfig' config map. Presto query
/// session properties take precedence over Presto system config properties.
std::unordered_map<std::string, std::string> toVeloxConfigs(
const protocol::SessionRepresentation& session);

/// Translates Presto session properties to Velox query level cuDF configs.
velox::cudf_velox::CudfQueryConfig toCudfConfigs(
const protocol::SessionRepresentation& session);

/// Translates Presto configs to Velox 'QueryConfig' config map. It is the
/// temporary overload that builds a QueryConfig from session properties and
/// extraCredentials, including all extraCredentials so they can be consumed by
Expand Down
15 changes: 13 additions & 2 deletions presto-native-execution/presto_cpp/main/QueryContextManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@

#include "presto_cpp/main/QueryContextManager.h"
#include <folly/executors/IOThreadPoolExecutor.h>
#include <glog/logging.h>
#include "presto_cpp/main/PrestoToVeloxQueryConfig.h"
#include "presto_cpp/main/SessionProperties.h"
#include "presto_cpp/main/common/Configs.h"
#include "presto_cpp/main/properties/session/SessionProperties.h"
#include "velox/connectors/hive/HiveConfig.h"
#include "velox/core/QueryConfig.h"
// CudfConfig is plain C++ (no CUDA) - always include for CudfQueryConfig type.
#include "velox/experimental/cudf/common/CudfConfig.h"

using namespace facebook::velox;

Expand Down Expand Up @@ -110,6 +113,7 @@ QueryContextManager::findOrCreateQueryCtx(
taskId,
toVeloxConfigs(
taskUpdateRequest.session, taskUpdateRequest.extraCredentials),
toCudfConfigs(taskUpdateRequest.session),
toConnectorConfigs(taskUpdateRequest));
}

Expand All @@ -122,6 +126,7 @@ QueryContextManager::findOrCreateBatchQueryCtx(
taskId,
toVeloxConfigs(
taskUpdateRequest.session, taskUpdateRequest.extraCredentials),
toCudfConfigs(taskUpdateRequest.session),
toConnectorConfigs(taskUpdateRequest));
if (queryCtx->pool()->aborted()) {
// In Batch mode, only one query is running at a time. When tasks fail
Expand All @@ -138,6 +143,7 @@ QueryContextManager::findOrCreateBatchQueryCtx(
taskId,
toVeloxConfigs(
taskUpdateRequest.session, taskUpdateRequest.extraCredentials),
toCudfConfigs(taskUpdateRequest.session),
toConnectorConfigs(taskUpdateRequest));
}
return queryCtx;
Expand All @@ -159,6 +165,7 @@ std::shared_ptr<core::QueryCtx>
QueryContextManager::createAndCacheQueryCtxLocked(
const QueryId& queryId,
velox::core::QueryConfig&& queryConfig,
velox::cudf_velox::CudfQueryConfig&& cudfConfigs,
std::unordered_map<std::string, std::shared_ptr<config::ConfigBase>>&&
connectorConfigs,
std::shared_ptr<memory::MemoryPool>&& pool) {
Expand All @@ -169,13 +176,16 @@ QueryContextManager::createAndCacheQueryCtxLocked(
cache::AsyncDataCache::getInstance(),
std::move(pool),
spillerExecutor_,
queryId);
queryId,
/* tokenProvider */ nullptr,
std::move(cudfConfigs));
return queryContextCache_.insert(queryId, std::move(queryCtx));
}

std::shared_ptr<core::QueryCtx> QueryContextManager::findOrCreateQueryCtxLocked(
const TaskId& taskId,
velox::core::QueryConfig&& queryConfig,
velox::cudf_velox::CudfQueryConfig&& cudfConfigs,
std::unordered_map<std::string, std::shared_ptr<config::ConfigBase>>&&
connectorConfigs) {
const QueryId queryId{queryIdFromTaskId(taskId)};
Expand Down Expand Up @@ -208,6 +218,7 @@ std::shared_ptr<core::QueryCtx> QueryContextManager::findOrCreateQueryCtxLocked(
return createAndCacheQueryCtxLocked(
queryId,
std::move(queryConfig),
std::move(cudfConfigs),
std::move(connectorConfigs),
std::move(pool));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
#include "presto_cpp/presto_protocol/core/presto_protocol_core.h"
#include "velox/core/QueryCtx.h"

namespace facebook::velox::cudf_velox {
class CudfQueryConfig;
}

namespace facebook::presto {
class QueryContextCache {
public:
Expand Down Expand Up @@ -112,6 +116,7 @@ class QueryContextManager {
virtual std::shared_ptr<velox::core::QueryCtx> createAndCacheQueryCtxLocked(
const protocol::QueryId& queryId,
velox::core::QueryConfig&& queryConfig,
velox::cudf_velox::CudfQueryConfig&& cudfConfigs,
std::unordered_map<
std::string,
std::shared_ptr<velox::config::ConfigBase>>&& connectorConfigs,
Expand All @@ -120,6 +125,7 @@ class QueryContextManager {
std::shared_ptr<velox::core::QueryCtx> findOrCreateQueryCtxLocked(
const protocol::TaskId& taskId,
velox::core::QueryConfig&& queryConfig,
velox::cudf_velox::CudfQueryConfig&& cudfConfigs,
std::unordered_map<
std::string,
std::shared_ptr<velox::config::ConfigBase>>&& connectorConfigStrings);
Expand Down
4 changes: 4 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@

namespace facebook::presto::util {

std::string boolToString(bool value) {
return value ? "true" : "false";
}

DateTime toISOTimestamp(uint64_t timeMilli) {
char buf[80];
time_t timeSecond = timeMilli / 1000;
Expand Down
3 changes: 3 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ namespace facebook::presto::util {
#define PRESTO_SHUTDOWN_LOG(severity) \
LOG(severity) << PRESTO_SHUTDOWN_LOG_PREFIX

/// Convert boolean to lowercase string representation.
std::string boolToString(bool value);

using DateTime = std::string;
DateTime toISOTimestamp(uint64_t timeMilli);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
#include "velox/connectors/tpcds/TpcdsConnector.h"
#include "velox/connectors/tpch/TpchConnector.h"
#ifdef PRESTO_ENABLE_CUDF
#include "velox/experimental/cudf/CudfConfig.h"
#include "velox/experimental/cudf/common/CudfConfig.h"
#include "velox/experimental/cudf/connectors/hive/CudfHiveConnector.h"
#endif

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

add_subdirectory(session)
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

add_library(presto_session_properties_provider SessionPropertiesProvider.cpp)

target_link_libraries(
presto_session_properties_provider
${FOLLY_WITH_DEPENDENCIES}
presto_protocol
presto_common
)

add_library(presto_session_properties SessionProperties.cpp)

target_link_libraries(presto_session_properties presto_session_properties_provider)

# presto_cudf_session_properties is always built (velox_cudf_config is plain
# C++ with no CUDA requirement) so the sidecar can expose cuDF session props.
add_library(presto_cudf_session_properties CudfSessionProperties.cpp)

target_link_libraries(
presto_cudf_session_properties
presto_session_properties_provider
velox_cudf_config
${FOLLY_WITH_DEPENDENCIES}
)

if(PRESTO_ENABLE_TESTING)
add_subdirectory(tests)
endif()
Loading
Loading